package com.atguigu.gmall.app.dwd.log;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.util.MyKafkaUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * 数据流：web/app -> Nginx -> 日志服务器(.log) -> Flume -> Kafka(ODS) -> FlinkApp -> Kafka(DWD) -> FlinkApp -> Kafka(DWD)
 * 程  序：Mock(lg.sh) -> Flume(f1) -> Kafka(ZK) -> BaseLogApp -> Kafka(ZK) -> DwdTrafficUserJumpDetail -> Kafka(ZK)
 *
 * @author : ranzlupup
 * @since : 2023/6/2 17:27
 */
public class DwdTrafficUserJumpDetail {
    public static void main(String[] args) throws Exception {
        //! 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); //生产环境中设置为Kafka主题的分区数

        //1.1 开启CheckPoint
        //env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
        //env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));

        //1.2 设置状态后端
        //env.setStateBackend(new HashMapStateBackend());
        //env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/211126/ck");
        //System.setProperty("HADOOP_USER_NAME", "atguigu");

        //! 2.读取kafka页面日志数据创建流
        String topicName = "FLINK_DWD_PAGE_LOG";
        String groupId = "FLINK_DWD_PAGE_LOG_USER_JUMP_DETAIL";
        FlinkKafkaConsumer<String> flinkKafkaConsumer = MyKafkaUtils.getFlinkKafkaConsumer(topicName, groupId);
        DataStreamSource<String> kafkaDS = env.addSource(flinkKafkaConsumer);

        //! 3.将每条数据转换为JSON对象
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map((JSONObject::parseObject));

        //! 4.提取事件时间 & 按照mid分组
        KeyedStream<JSONObject, String> keyedStream = jsonObjDS.assignTimestampsAndWatermarks(
                WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                            @Override
                            public long extractTimestamp(JSONObject element, long recordTimestamp) {
                                return element.getLong("ts");
                            }
                        })
        ).keyBy(json -> json.getJSONObject("common").getString("mid"));

        //! 5.定义CEP的模式序列
        Pattern<JSONObject, JSONObject> pattern = Pattern
                .<JSONObject>begin("start")
                .where(
                        // 条件1: 进入第一个页面 (没有上一个页面)
                        new SimpleCondition<JSONObject>() {
                            @Override
                            public boolean filter(JSONObject value) throws Exception {
                                return value.getJSONObject("page").getString("last_page_id") == null;
                            }
                        }
                )
                .next("next")
                .where(
                        // 条件2: 如果出现第二个 首页 收集条件一的数据
                        new SimpleCondition<JSONObject>() {
                            @Override
                            public boolean filter(JSONObject value) throws Exception {
                                return value.getJSONObject("page").getString("last_page_id") == null;
                            }
                        }
                )
                // 如果收集到第一条数据后10s没出现第二条，则认为第一条数据为跳出数据
                .within(Time.seconds(10));

/*        Pattern
                .<JSONObject>begin("start")
                .where(
                        new SimpleCondition<JSONObject>() {
                            @Override
                            public boolean filter(JSONObject value) throws Exception {
                                return value.getJSONObject("page").getString("last_page_id") == null;
                            }
                        }
                )
                .times(2) // 循环模式，默认是宽松近邻 followedBy
                .consecutive() // 严格近邻 next
                .within(Time.seconds(10));*/

        //! 6.将模式序列作用到流上
        PatternStream<JSONObject> patternStream = CEP.pattern(keyedStream, pattern);

        //! 7.提取事件（匹配上的事件以及超时事件）
        OutputTag<String> timeOutTag = new OutputTag<String>("timeOut") {
        };
        SingleOutputStreamOperator<String> selectDS = patternStream.select(
                timeOutTag,
                new PatternTimeoutFunction<JSONObject, String>() {
                    @Override
                    public String timeout(Map<String, List<JSONObject>> map, long l) throws Exception {
                        return map.get("start").get(0).toJSONString();
                    }
                },
                new PatternSelectFunction<JSONObject, String>() {
                    @Override
                    public String select(Map<String, List<JSONObject>> map) throws Exception {
                        return map.get("start").get(0).toJSONString();
                    }
                }
        );
        DataStream<String> timeOutDS = selectDS.getSideOutput(timeOutTag);

        //! 8.合并两种事件
        DataStream<String> unionDS = selectDS.union(timeOutDS);

        //! 9.将数据写出到kafka
        selectDS.print("Select>>>>>>>");
        timeOutDS.print("TimeOut>>>>>");
        String targetTopic = "FLINK_DWD_TRAFFIC_USER_JUMP_DETAIL";
        unionDS.addSink(MyKafkaUtils.getFlinkKafkaProducer(targetTopic));

        //! 10.启动任务
        env.execute("DwdTrafficUserJumpDetail");
    }
}
